Crate object_store

source
Expand description

§object_store

This crate provides a uniform API for interacting with object storage services and local files via the ObjectStore trait.

Using this crate, the same binary and code can run in multiple clouds and local test environments, via a simple runtime configuration change.

§Highlights

  1. A high-performance async API focused on providing a consistent interface mirroring that of object stores such as S3

  2. Production quality, leading this crate to be used in large scale production systems, such as crates.io and [InfluxDB IOx]

  3. Support for advanced functionality, including atomic, conditional reads and writes, vectored IO, bulk deletion, and more…

  4. Stable and predictable governance via the Apache Arrow project

  5. Small dependency footprint, depending on only a small number of common crates

Originally developed by InfluxData and subsequently donated to Apache Arrow.

§Available ObjectStore Implementations

By default, this crate provides the following implementations:

Feature flags are used to enable support for other implementations:

§Why not a Filesystem Interface?

The ObjectStore interface is designed to mirror the APIs of object stores and not filesystems, and thus has stateless APIs instead of cursor based interfaces such as Read or Seek available in filesystems.

This design provides the following advantages:

  • All operations are atomic, and readers cannot observe partial and/or failed writes
  • Methods map directly to object store APIs, providing both efficiency and predictability
  • Abstracts away filesystem and operating system specific quirks, ensuring portability
  • Allows for functionality not native to filesystems, such as operation preconditions and atomic multipart uploads

This crate does provide BufReader and BufWriter adapters which provide a more filesystem-like API for working with the ObjectStore trait, however, they should be used with care

§Adapters

ObjectStore instances can be composed with various adapters which add additional functionality:

§Configuration System

This crate provides a configuration system inspired by the APIs exposed by fsspec, PyArrow FileSystem, and Hadoop FileSystem, allowing creating a DynObjectStore from a URL and an optional list of key value pairs. This provides a flexible interface to support a wide variety of user-defined store configurations, with minimal additional application complexity.

// Can manually create a specific store variant using the appropriate builder
let store: AmazonS3 = AmazonS3Builder::from_env()
    .with_bucket_name("my-bucket").build().unwrap();

// Alternatively can create an ObjectStore from an S3 URL
let url = Url::parse("s3://bucket/path").unwrap();
let (store, path) = parse_url(&url).unwrap();
assert_eq!(path.as_ref(), "path");

// Potentially with additional options
let (store, path) = parse_url_opts(&url, vec![("aws_access_key_id", "...")]).unwrap();

// Or with URLs that encode the bucket name in the URL path
let url = Url::parse("https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket/path").unwrap();
let (store, path) = parse_url(&url).unwrap();
assert_eq!(path.as_ref(), "path");

§List objects

Use the ObjectStore::list method to iterate over objects in remote storage or files in the local filesystem:

// create an ObjectStore
let object_store: Arc<dyn ObjectStore> = get_object_store();

// Recursively list all files below the 'data' path.
// 1. On AWS S3 this would be the 'data/' prefix
// 2. On a local filesystem, this would be the 'data' directory
let prefix = Path::from("data");

// Get an `async` stream of Metadata objects:
let mut list_stream = object_store.list(Some(&prefix));

// Print a line about each object
while let Some(meta) = list_stream.next().await.transpose().unwrap() {
    println!("Name: {}, size: {}", meta.location, meta.size);
}

Which will print out something like the following:

Name: data/file01.parquet, size: 112832
Name: data/file02.parquet, size: 143119
Name: data/child/file03.parquet, size: 100
...

§Fetch objects

Use the ObjectStore::get method to fetch the data bytes from remote storage or files in the local filesystem as a stream.

// Create an ObjectStore
let object_store: Arc<dyn ObjectStore> = get_object_store();

// Retrieve a specific file
let path = Path::from("data/file01.parquet");

// Fetch just the file metadata
let meta = object_store.head(&path).await.unwrap();
println!("{meta:?}");

// Fetch the object including metadata
let result: GetResult = object_store.get(&path).await.unwrap();
assert_eq!(result.meta, meta);

// Buffer the entire object in memory
let object: Bytes = result.bytes().await.unwrap();
assert_eq!(object.len(), meta.size);

// Alternatively stream the bytes from object storage
let stream = object_store.get(&path).await.unwrap().into_stream();

// Count the '0's using `try_fold` from `TryStreamExt` trait
let num_zeros = stream
    .try_fold(0, |acc, bytes| async move {
        Ok(acc + bytes.iter().filter(|b| **b == 0).count())
    }).await.unwrap();

println!("Num zeros in {} is {}", path, num_zeros);

§Put Object

Use the ObjectStore::put method to atomically write data.

let object_store: Arc<dyn ObjectStore> = get_object_store();
let path = Path::from("data/file1");
let payload = PutPayload::from_static(b"hello");
object_store.put(&path, payload).await.unwrap();

§Multipart Upload

Use the ObjectStore::put_multipart method to atomically write a large amount of data

let object_store: Arc<dyn ObjectStore> = get_object_store();
let path = Path::from("data/large_file");
let upload =  object_store.put_multipart(&path).await.unwrap();
let mut write = WriteMultipart::new(upload);
write.write(b"hello");
write.finish().await.unwrap();

§Vectored Read

A common pattern, especially when reading structured datasets, is to need to fetch multiple, potentially non-contiguous, ranges of a particular object.

ObjectStore::get_ranges provides an efficient way to perform such vectored IO, and will automatically coalesce adjacent ranges into an appropriate number of parallel requests.

let object_store: Arc<dyn ObjectStore> = get_object_store();
let path = Path::from("data/large_file");
let ranges = object_store.get_ranges(&path, &[90..100, 400..600, 0..10]).await.unwrap();
assert_eq!(ranges.len(), 3);
assert_eq!(ranges[0].len(), 10);

§Vectored Write

When writing data it is often the case that the size of the output is not known ahead of time.

A common approach to handling this is to bump-allocate a Vec, whereby the underlying allocation is repeatedly reallocated, each time doubling the capacity. The performance of this is suboptimal as reallocating memory will often involve copying it to a new location.

Fortunately, as PutPayload does not require memory regions to be contiguous, it is possible to instead allocate memory in chunks and avoid bump allocating. PutPayloadMut encapsulates this approach

let object_store: Arc<dyn ObjectStore> = get_object_store();
let path = Path::from("data/large_file");
let mut buffer = PutPayloadMut::new().with_block_size(8192);
for _ in 0..22 {
    buffer.extend_from_slice(&[0; 1024]);
}
let payload = buffer.freeze();

// Payload consists of 3 separate 8KB allocations
assert_eq!(payload.as_ref().len(), 3);
assert_eq!(payload.as_ref()[0].len(), 8192);
assert_eq!(payload.as_ref()[1].len(), 8192);
assert_eq!(payload.as_ref()[2].len(), 6144);

object_store.put(&path, payload).await.unwrap();

§Conditional Fetch

More complex object retrieval can be supported by ObjectStore::get_opts.

For example, efficiently refreshing a cache without re-fetching the entire object data if the object hasn’t been modified.

struct CacheEntry {
    /// Data returned by last request
    data: Bytes,
    /// ETag identifying the object returned by the server
    e_tag: String,
    /// Instant of last refresh
    refreshed_at: Instant,
}

/// Example cache that checks entries after 10 seconds for a new version
struct Cache {
    entries: HashMap<Path, CacheEntry>,
    store: Arc<dyn ObjectStore>,
}

impl Cache {
    pub async fn get(&mut self, path: &Path) -> Result<Bytes> {
        Ok(match self.entries.get_mut(path) {
            Some(e) => match e.refreshed_at.elapsed() < Duration::from_secs(10) {
                true => e.data.clone(), // Return cached data
                false => { // Check if remote version has changed
                    let opts = GetOptions {
                        if_none_match: Some(e.e_tag.clone()),
                        ..GetOptions::default()
                    };
                    match self.store.get_opts(&path, opts).await {
                        Ok(d) => e.data = d.bytes().await?,
                        Err(Error::NotModified { .. }) => {} // Data has not changed
                        Err(e) => return Err(e),
                    };
                    e.refreshed_at = Instant::now();
                    e.data.clone()
                }
            },
            None => { // Not cached, fetch data
                let get = self.store.get(&path).await?;
                let e_tag = get.meta.e_tag.clone();
                let data = get.bytes().await?;
                if let Some(e_tag) = e_tag {
                    let entry = CacheEntry {
                        e_tag,
                        data: data.clone(),
                        refreshed_at: Instant::now(),
                    };
                    self.entries.insert(path.clone(), entry);
                }
                data
            }
        })
    }
}

§Conditional Put

The default behaviour when writing data is to upsert any existing object at the given path, overwriting any previous value. More complex behaviours can be achieved using PutMode, and can be used to build Optimistic Concurrency Control based transactions. This facilitates building metadata catalogs, such as Apache Iceberg or Delta Lake, directly on top of object storage, without relying on a separate DBMS.

let store = get_object_store();
let path = Path::from("test");

// Perform a conditional update on path
loop {
    // Perform get request
    let r = store.get(&path).await.unwrap();

    // Save version information fetched
    let version = UpdateVersion {
        e_tag: r.meta.e_tag.clone(),
        version: r.meta.version.clone(),
    };

    // Compute new version of object contents
    let new = do_update(r.bytes().await.unwrap());

    // Attempt to commit transaction
    match store.put_opts(&path, new.into(), PutMode::Update(version).into()).await {
        Ok(_) => break, // Successfully committed
        Err(Error::Precondition { .. }) => continue, // Object has changed, try again
        Err(e) => panic!("{e}")
    }
}

§TLS Certificates

Stores that use HTTPS/TLS (this is true for most cloud stores) can choose the source of their CA certificates. By default the system-bundled certificates are used (see rustls-native-certs). The tls-webpki-roots feature switch can be used to also bundle Mozilla’s root certificates with the library/application (see webpki-roots).

Modules§

  • An object store implementation for S3
  • An object store implementation for Azure blob storage
  • Utilities for performing tokio-style buffered IO
  • A ChunkedStore that can be used to test streaming behaviour
  • Utility for streaming newline delimited files from object storage
  • An object store implementation for Google Cloud Storage
  • An object store implementation for generic HTTP servers
  • Integration tests for custom object store implementations
  • An object store that limits the maximum concurrency of the wrapped implementation
  • An object store implementation for a local filesystem
  • An in-memory object store implementation
  • Cloud Multipart Upload
  • Path abstraction for Object Storage
  • An object store wrapper handling a constant path prefix
  • Abstraction of signed URL generation for those object store implementations that support it
  • A throttling object store wrapper

Structs§

Enums§

Constants§

Traits§

Functions§

Type Aliases§

  • An alias for a dynamically dispatched object store implementation.
  • Id type for multipart uploads.
  • A specialized Result for object store-related errors
  • An upload part request